package com.app.store;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import com.app.db.DaoUtil;
import com.app.exception.DCacheException;
import com.app.model.ColumnInfo;
import com.app.model.TableInfo;
import com.app.util.JSONUtil;
import com.app.util.StringUtils;

public class ColumnStore extends AbstractMySqlStore {

	private static final Logger logger = Logger.getLogger(ColumnStore.class);
	
	ColumnStore() {
		super(COLUM_STORE);
	}
	@Override
	public void initTables(List<TableInfo> tableInfos) throws DCacheException {
		
		if(init) {
			return;
		}
		init = true;
		Connection conn = null;
		try {
			conn = pool.getConnection();
			for(TableInfo tableInfo : tableInfos) {
				initTable(tableInfo, conn);
			}
		} catch (SQLException e) {
			e.printStackTrace();
			throw new DCacheException(e);
		}finally {
			DaoUtil.close(conn);
			conn = null;
		}
		cacheTables();
	}
	
	private void initTable(TableInfo tableInfo,Connection conn) {
		PreparedStatement ps= null;
		ResultSet rs = null;
		try {
			String tableName = tableInfo.getTableName();
			
			Map<String,ColumnInfo> dbcols = getColumnInfo(tableName,conn);
			//对比
			List<ColumnInfo> newcols = tableInfo.getColInfos();

			for(ColumnInfo ci : newcols) {
				StringBuilder sb = new StringBuilder();
				sb.append("ALTER TABLE `");
				sb.append(tableName);
				sb.append("`");
				String flag = null;
				ColumnInfo db = dbcols.get(ci.getName());
				if(db != null) {
					if(ci.changed(db)) {
						//修改
						flag = " MODIFY COLUMN ";
					}
				}
				else {//增加列
					flag = " ADD COLUMN ";
				}
				if(flag != null) {
					sb.append(flag);
					sb.append("`");
					sb.append(ci.getName());
					sb.append("` ");
					sb.append(ci.getType());
					sb.append(" ");
					if(!StringUtils.isNullOrEmpty(ci.getDef())) {
						if(ci.getType().startsWith("varchar")) {
							sb.append("DEFAULT '");
							sb.append(ci.getDef());
							sb.append("'");
						}
						else {
							sb.append("DEFAULT ");
							sb.append(ci.getDef());
						}
					}
					sb.append(ci.isNullable()? " NULL " : " NOT NULL ");
					if(!StringUtils.isNullOrEmpty(ci.getExtra())) {
						sb.append(" ");
						sb.append(ci.getExtra());
					}
					ps = conn.prepareStatement(sb.toString());
					ps.executeUpdate();
					DaoUtil.close(ps);
					ps = null;
					logger.info(tableName+" is changed "+flag+ci.getName());
				}
				if((db != null && db.isIndex() != ci.isIndex()) || (db == null && ci.isIndex())) {//变更索引
					sb = new StringBuilder();
					sb.append("ALTER TABLE `");
					sb.append(tableName);
					sb.append("`");
					flag = null;
					if(ci.isIndex()) {
						if(!ci.isKey()) {
							flag = " ADD INDEX ";
						}
					}
					else {
						flag = " DROP INDEX ";
					}
					if(flag != null) {
						sb.append(flag);
						sb.append(" index_");
						sb.append(ci.getName());
						sb.append("(`");
						sb.append(ci.getName());
						sb.append("`)");
						ps = conn.prepareStatement(sb.toString());
						ps.executeUpdate();
						DaoUtil.close(ps);
						ps = null;
						logger.info(tableName+" column is changed "+flag+ci.getName());
					}
					
				}
				
				if((db != null && db.isKey() != ci.isKey())|| (db == null && ci.isKey())) {//变更主键
					sb = new StringBuilder();
					sb.append("ALTER TABLE `");
					sb.append(tableName);
					sb.append("`");
					if(ci.isIndex()) {
						sb.append(" ADD PRIMARY KEY");
					}
					else {
						sb.append(" DROP PRIMARY KEY");
					}
					sb.append("(`");
					sb.append(ci.getName());
					sb.append("`)");
					ps = conn.prepareStatement(sb.toString());
					ps.executeUpdate();
					DaoUtil.close(ps);
					ps = null;
					logger.info(tableName+" primary key is changed "+ci.getName());
				}
				
				dbcols.remove(ci.getName());
			}
			
			//剩余的 删掉
			for(ColumnInfo ci : dbcols.values()) {
				StringBuilder sb = new StringBuilder();
				sb.append("ALTER TABLE `");
				sb.append(tableName);
				sb.append("`");
				sb.append(" DROP COLUMN `");
				sb.append(ci.getName());
				sb.append("`");
				ps = conn.prepareStatement(sb.toString());
				ps.executeUpdate();
				DaoUtil.close(ps);
				ps = null;
				logger.info(tableName+" column droped:"+ci.getName());
			}
		} catch(SQLException e) {
			if(e.getErrorCode() == 1146) {
				//表不存在,创建
				createTable(tableInfo,conn);
			}
			else {
				e.printStackTrace();
				throw new DCacheException(e.getMessage(),e);
				
			}
		} catch (Exception e) {
			e.printStackTrace();
			throw new DCacheException(e.getMessage(),e);
		} finally {
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			rs = null;
			ps = null;
		}
	}

	@Override
	public Map<String,TableInfo> cacheTables() throws DCacheException {
		PreparedStatement ps=null;
		Connection conn=null;
		String sql = "show tables";
		ResultSet rs = null;
		try{
			Map<String,TableInfo> tn = new ConcurrentHashMap<String,TableInfo>();
			conn=pool.getConnection();
			ps=conn.prepareStatement(sql);
			rs = ps.executeQuery();
			while (rs.next()) {
				
				String tableName = rs.getString(1);
					
				TableInfo tableInfo = new TableInfo();
				
				tableInfo.getColInfos().clear();
				
				tableInfo.addAllColInfo(getColumnInfo(tableName,conn).values());
				
				for(ColumnInfo ci : tableInfo.getColInfos()) {
					if(ci.isKey()) {
						tableInfo.setKey(ci);
						break;
					}
				}
				
				tableInfo.setTableName(tableName);
				
				tn.put(tableName, tableInfo);
			}
			
			tables.putAll(tn);
			
			for(String k : tables.keySet()) {
				if(!tn.containsKey(k)) {
					tables.remove(k);
				}
			}
			
		} catch (Exception e) {
			logger.error(e.getMessage()+" sql:"+sql);
			e.printStackTrace();
			throw new DCacheException(e);
		}
		finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			rs = null;
			ps = null;
		}
		return tables;
	}
	
	private void createTable(TableInfo tableInfo,Connection conn) {
		StringBuilder sql=new StringBuilder();
		String key = null;
		sql.append("CREATE TABLE IF NOT EXISTS `");
		sql.append(tableInfo.getTableName());
		sql.append("`");
		sql.append(" (");
		for(ColumnInfo ci : tableInfo.getColInfos()) {
			sql.append("`");
			sql.append(ci.getName());
			sql.append("` ");
			sql.append(ci.getType());
			sql.append(" ");
			if(!StringUtils.isNullOrEmpty(ci.getDef())) {
				if(ci.getType().startsWith("varchar")) {
					sql.append("DEFAULT '");
					sql.append(ci.getDef());
					sql.append("'");
				}
				else {
					sql.append("DEFAULT ");
					sql.append(ci.getDef());
				}
			}
			sql.append(ci.isNullable()? " NULL " : " NOT NULL ");
			if(!StringUtils.isNullOrEmpty(ci.getExtra())) {
				sql.append(" ");
				sql.append(ci.getExtra());
			}
			sql.append(",");
			if(ci.isKey()) {
				key = ci.getName();
				sql.append(" PRIMARY KEY (");
				sql.append("`");
				sql.append(ci.getName());
				sql.append("`");
				sql.append("),");
			}
			if(!ci.isKey() && ci.isIndex()) {
				sql.append(" INDEX index_");
				sql.append(ci.getName());
				sql.append("(`");
				sql.append(ci.getName());
				sql.append("`),");
			}
		}
		String ret = sql.substring(0, sql.length() - 1);
		ret += ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin PARTITION BY KEY (`"+key+"`) PARTITIONS "+cfg.getTablePartition();
		PreparedStatement ps=null;
		try{
			ps=conn.prepareStatement(ret);
			ps.executeUpdate();
			logger.info("create table :"+tableInfo.getTableName());
		} catch (Exception e) {
			logger.error(e.getMessage()+" sql:"+ret);
			e.printStackTrace();
			throw new DCacheException(e);
		}
		finally{
			DaoUtil.close(ps);
			ps = null;
		}
	}

	@Override
	protected List<String> converResult2Json(String tableName,ResultSet rs) throws DCacheException {
		List<String> ret = new ArrayList<String>();
		try {
			List<ColumnInfo> cols = tables.get(tableName).getColInfos();
			while(rs.next()) {
				Map<String,Object> map = new LinkedHashMap<String, Object>();	
				for(ColumnInfo ci : cols) {
					if(ci.getName().equals(ColumnInfo.LAST_TIME.getName())) {
						continue;
					}
					Object obj = rs.getObject(ci.getName());
					if(obj != null) {
						if(obj.getClass().isArray()) {
							obj = new String((byte[])obj,"UTF-8");
						}
						map.put(ci.getName(), obj);
					}
				}
				ret.add(JSONUtil.toJSON(map));
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e);
			throw new DCacheException(e);
		}
		
		return ret;
	}
	
	@Override
	public int putEntity(String tableName, long id, String value,
			List<String> indexAttr, List<String> indexValue,
			LinkedHashMap<String, String> changed, boolean flushdb)
			throws DCacheException {
		int ret = -1;
		try {
			if(flushdb) {
				putDbValue(tableName,changed);
			}
			else {
				//放入更新队列
				putUpdateQueue(tableName,id,changed);
			}
			String memKey=this.genMemcKey(tableName, id);
			boolean tmp = client.set(memKey, value,DEFAULT_EXPIRE);
			if(!tmp) {
				logger.error("数据存储到缓存失败");
			}
			ret = 1;
			putTimes.getAndAdd(1);
			if(logger.isDebugEnabled()) {
				AtomicLong ato = new AtomicLong();
				AtomicLong old = put.putIfAbsent(tableName, ato);
				if(old != null) {
					ato = old;
				}
				ato.addAndGet(1);
			}
		} catch (Exception e) {
			e.printStackTrace();
			throw new DCacheException(e);
		}
		return ret;
	}
	@Override
	public void putEntityList(String tableName, List<Long> keyList,
			List<String> valueList, List<String> indexAttr,
			List<List<String>> indexValueList,
			List<LinkedHashMap<String, String>> changedList, boolean flushdb)
			throws DCacheException {
		for(int i=0;i<keyList.size();i++) {
			putEntity(tableName, keyList.get(i), valueList.get(i), indexAttr, indexValueList.get(i), changedList.get(i), flushdb);
		}
		
	}
	@Override
	public void putMultiEntityList(List<String> tableNameList,
			List<Long> keyList, List<String> valueList,
			List<List<String>> indexAttrList,
			List<List<String>> indexValueList,
			List<LinkedHashMap<String, String>> changedList, boolean flushdb)
			throws DCacheException {
		for(int i=0;i<tableNameList.size();i++) {
			putEntity(tableNameList.get(i), keyList.get(i), valueList.get(i), indexAttrList.get(i), indexValueList.get(i), changedList.get(i), flushdb);
		}
	}
}
